In [2]:
%run startup.py
In [3]:
%%javascript
$.getScript('./assets/js/ipython_notebook_toc.js')
source: http://reactivex.io/documentation/operators.html#tree.
(transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, axiros)
This tree can help you find the ReactiveX Observable operator you’re looking for.
See Part 1 for Usage and Output Instructions.
We also require acquaintance with the marble diagrams feature of RxPy.
This is a helpful accompanying read.
In [6]:
reset_start_time(O.merge)
l = []
def excepting_f(obs):
for i in range(10):
l.append(1)
obs.on_next(1 / (3 - len(l)))
stream1 = O.from_(('a', 'b', 'c'))
stream2 = O.create(excepting_f)
# merged stream stops in any case at first exception!
# No guarantee of order of those immediately created streams !
d = subs(stream1.merge(stream2))
l = []
d = subs(O.merge(new_thread_scheduler, [stream1, stream2]))
In [167]:
rst(O.merge_all, title='merge_all')
meta = O.repeat(O.from_((1, 2, 3)), 3)
# no guarantee of order, immediatelly created:
d = subs(meta.merge_all())
# Introducing delta ts:
d = subs(O.repeat(O.timer(10, 10)\
.take(3), 3)\
.merge_all(),
name='streams with time delays between events')
In [168]:
rst(O.concat)
s1 = O.from_((1, 2))
s2 = O.from_((3, 4))
# while normal subscriptions work as expected...
d1, d2 = subs(s1), subs(s2)
# ... another one can have the order reversed
d = subs(O.concat([s2, s1]))
In [169]:
rst()
# See the marbles notebook:
s1 = O.from_marbles('1--2---3|').to_blocking()
s2 = O.from_marbles('--a-b-c|' ).to_blocking()
d = (subs(s1, name='A'),
subs(s2, name='B'))
rst(title="Concatenating in reverse order", sleep=1)
d = subs(O.concat([s2, s1]), name='C')
In [176]:
rst(O.zip)
s1 = O.range(0, 5)
d = subs(O.zip(s1, s1.skip(1), s1.skip(2), lambda s1, s2, s3: '%s : %s : %s' % (s1, s2, s3)))
In [179]:
rst(O.zip_list) # alias: zip_array
s1 = O.range(0, 5)
d = subs(O.zip_list(s1, s1.skip(1), s1.skip(2)))
In [17]:
rst(O.combine_latest, title='combine_latest')
s1 = O.interval(100).map(lambda i: 'First : %s' % i)
s2 = O.interval(150).map(lambda i: 'Second: %s' % i)
# the start is interesting, both must have emitted, so it starts at 150ms with 0/0:
d = subs(s1.combine_latest(s2, lambda s1, s2: '%s, %s' % (s1, s2)).take(6))
rst(title='For comparison: merge', sleep=1)
d = subs(s1.merge(s2).take(6))
In [16]:
rst(O.with_latest_from, title='with_latest_from')
s1 = O.interval(140).map(lambda i: 'First : %s' % i)
s2 = O.interval(50) .map(lambda i: 'Second: %s' % i)
d = subs(s1.with_latest_from(s2, lambda s1, s2: '%s, %s' % (s1, s2)).take(6))
The join operator takes four parameters:
In [23]:
rst(O.join)
# this one is pretty timing critical and output seems swallowed with 2 threads (over)writing.
# better try this with timer(0) on the console. Also the scheduler of the timers is critical,
# try other O.timer schedulers...
xs = O.interval(100).map(lambda i: 'First : %s' % i)
ys = O.interval(101).map(lambda i: 'Second: %s' % i)
d = subs(xs.join(ys, lambda _: O.timer(10), lambda _: O.timer(0), lambda x, y: '%s %s' % (x, y)).take(5))
The groupJoin operator takes four parameters:
In [27]:
rst(O.group_join, title='group_join')
xs = O.interval(100).map(lambda i: 'First : %s' % i)
ys = O.interval(100).map(lambda i: 'Second: %s' % i)
d = subs(xs.group_join(ys,
lambda _: O.timer(0),
lambda _: O.timer(0),
lambda x, yy: yy.select(lambda y: '%s %s' % (x, y))).merge_all().take(5))
The combination of the And, Then, and When operators behave much like the Zip operator, but they do so by means of intermediate data structures. And accepts two or more Observables and combines the emissions from each, one set at a time, into Pattern objects. Then operates on such Pattern objects, transforming them in a Plan. When in turn transforms these various Plan objects into emissions from an Observable.
details The And/Then/When trio has more overloads that enable you to group an even greater number of sequences. They also allow you to provide more than one 'plan' (the output of the Then method). This gives you the Merge feature but on the collection of 'plans'. I would suggest playing around with them if this functionality is of interest to you. The verbosity of enumerating all of the combinations of these methods would be of low value. You will get far more value out of using them and discovering for yourself.
As we delve deeper into the depths of what the Rx libraries provide us, we can see more practical usages for it. Composing sequences with Rx allows us to easily make sense of the multiple data sources a problem domain is exposed to. We can concatenate values or sequences together sequentially with StartWith, Concat and Repeat. We can process multiple sequences concurrently with Merge, or process a single sequence at a time with Amb and Switch. Pairing values with CombineLatest, Zip and the And/Then/When operators can simplify otherwise fiddly operations like our drag-and-drop examples and monitoring system status.
In [59]:
rst()
# see the similarity to zip.
ts = time.time()
def _dt():
# giving us info when an element was created:
return 'from time: %.2f' % (time.time() - ts)
one = O.interval(1000) .map(lambda i: 'Seconds : %s %s' % (i, _dt())).take(5)
two = O.interval(500) .map(lambda i: 'HalfSecs: %s %s' % (i, _dt())).take(5)
three = O.interval(100).map(lambda i: '10thS : %s %s' % (i, _dt())).take(5)
z = O.when(
one \
.and_(two) \
.and_(three)\
.then_do(lambda a, b, c: '\n'.join(('', '', a, b, c))))
# from the output you see that the result stream consists of elements built at each interval
# (which is in the past for 'two' and 'three'),
# buffered until the 1 second sequence 'one' advances a step.
d = subs(z)
In [71]:
rst(O.switch_latest)
s = O.range(0, 3).select(lambda x: O.range(x, 3)\
# showing from which stream our current value comes:
.map(lambda v: '%s (from stream nr %s)' % (v, x)))\
.switch_latest()
d = subs(s)
In [ ]: